\section{Task Management Subsystem}
\label{sec:taskManagementSubsystem}
\index{Subsystem!Task Management}

Package: \texttt{org.glite.rgma.server.services.tasks}

\subsection{Overview}
A task is used for a sequence of operations that can be queued to run at some 
time in the future. A task will be tried and if it fails may be retried as 
explained below.  The tasks are processed by threads in a pool. Each task has a 
key value that is associated with the resource that the task uses. This key is 
used to ensure that the finite thread pool does not have many threads trying to 
make use of a malfunctioning resource.

When tasks are submitted to the \texttt{TaskManager} they are added to a queue 
to await processing. The \texttt{TaskManager} has a pool of 
\texttt{TaskInvocators} which periodically pop a task off the queue to process 
it.  The processing algorithm establishes a set of ``good'' keys. Only if a key 
is in the good key set will more than one task be run at a time with that key. 
If the tasks key is not in the goodKey set and any other \texttt{TaskInvocator} 
is using that key the Task is re-queued.  Only when an invocation is successful 
on the \textit{first} attempt is the key added to the goodKey set. If a task 
fails its key is removed from the goodKey set. If a task returns a 
\texttt{HARD\_ERROR} then it and all other tasks on the queue with the same key 
will all be removed from the queue.

To ensure that threads are always available to process tasks using resources 
that are performing well, some threads will be configured to only take tasks 
with a key in the goodKey set.

The use of timers is made to ensure that a task does not run indefinitely. 
There is a timer in the \texttt{TaskInvocators} that will interrupt a task if 
it over runs. There is another timer in the \texttt{TaskManager} that will 
abandon a \texttt{TaskInvocator} and create a new one if necessary.

An algorithm has been devised to ensure the \texttt{TaskInvocators} do not 
continually loop furiously through the queue when the only tasks in the queue 
are tasks that cannot currently be run.

The \textbf{user} of the \texttt{TaskManager} must provide a class that extends 
the \texttt{Task} class and provide an implementation of the \texttt{invoke()} 
method. The user should set a sensible value for the maxRunTimeMillis in the 
Tasks constructor; this is the time period that the task will be allowed to run 
before the \texttt{TaskManager} considers the task to have hung and interrupts 
it. A value for the maximum attempt count also has to be included, it is 
recommended that this is set to 2. An instance of the extension of the 
\texttt{Task} needs to be added to the singleton \texttt{TaskManager}. 
\textit{The user should keep a reference to the task if he wishes to 
interrogate it}. On completion of the task, the tasks extension should ensure 
that it returns \texttt{SUCCESS}, \texttt{SOFT\_ERROR} or \texttt{HARD\_ERROR}. 
If \texttt{SOFT\_ERROR} is returned the \texttt{TaskManager} will re-queue the 
task for retry.

The task is created with a zero \texttt{attemptNumber}. After each time it is 
invoked the \texttt{attemptNumber} is incremented by the 
\texttt{TaskInvocator}. It is the responsibility of the Task's extension to 
make use of the \texttt{attemptNumber} and use an appropriate strategy to 
achieve success or to return a \texttt{HARD\_ERROR} after a finite number of 
attempts.

The \texttt{TaskManager} also offers a look up mechanism against an 
\texttt{ownerId} provided by the user when creating the task. This ownerId 
associated with the task is not unique and is intended for diagnostics.

\subsection{Task}
Task is an abstract class with an \texttt{invoke()} method that must be 
implemented to carry out the task. The derived class must encapsulate all data 
and functionality required to support the \texttt{invoke()} call.  The 
constructor of the derived class must call the superclass constructor with the 
owner id,  key and maximum run time milliseconds as arguments.

If the task is still running after the \texttt{maxRunTimeMillis} then it will 
be sent an interrupt signal from the \texttt{TaskInvocator}. Upon receiving an 
interrupt the task should exit gracefully. Once an interrupt is sent the task 
will be regarded as having returned a hard error irrespective of the value it 
returns.

\paragraph{Task(ownerId, key, maxRunTimeMillis)}
Public constructor that takes the specified ownerId and key as strings, 
maxRunTimeMillis as a long and maxAttemptCount as an int.
\begin{code}
\item ownerId - owner associated with the task, intended for diagnostics
\item key - an identifier of the resource to be used, i.e. a URL
\item maxRunTimeMillis - time period in milliseconds after which the task will 
be interrupted
\item maxAttemptCount - the number of attempts that the TaskInvocator will 
attempt a task before it sets the tasks ResultCode to texttt{HARD\_ERROR}
\end{code}

\begin{code}
\item attemptNumber = 0
\end{code}

\paragraph{abstract invoke()}
Invokes this task returns a status code (\texttt{SUCCESS}, \texttt{SOFT\_ERROR} 
or \texttt{HARD\_ERROR}).

\paragraph{abort()}This is a public method that marks the task as aborted by 
setting the status code to \texttt{HARD\_ERROR}, only if the current status is 
\texttt{NULL} or \texttt{SOFT\_ERROR}.

\paragraph{getCurrentAttemptNumber()}This is a protected method that returns 
the number of times that the task has been invoked.

\paragraph{getResultCode()}This is a public method that returns the result 
status. This may be called at any time. When a task is created the status is 
NULL. Return values may be: \texttt{SUCCESS}, \texttt{SOFT\_ERROR}, 
\texttt{HARD\_ERROR} or \texttt{NULL}.

\paragraph{setResultCode(Result)}This is a \emph{synchronized} package method 
used to set the result status. If the result status is already \texttt{SUCCESS} 
or \texttt{HARD\_ERROR} then the status is not updated.

\paragraph{statusInfo()}This is a public method that returns a map of status 
information, used for monitoring purposes.

\paragraph{toString()}
This is a public method that returns the key and owner id of the task.

\subsection{TaskManager}
This class is a singleton. It uses data structures and threads as indicated in 
the constructor.

\paragraph{Constructor} is private, throws RGMAConfigurationException.

\begin{code}
\item create the task queue - a linked list for tasks needing to be run.
\item create the goodKey set - a \emph{synchronized} hash set for keys that are 
working well.
\item create the currentTasks map - a \emph{synchronized} hash map keyed on the 
task.key giving the count of the \texttt{TaskInvocators} currently processing 
tasks with that key.
\item create a pool of \texttt{TaskInvocators}. Some of these are able to deal 
with any task and some only with tasks with key in the goodKey set. The values 
are taken from a configuration file containing the total number of threads and 
the number for good only threads. There must be at least one 
\texttt{TaskInvocator} that can invoke any task.
\item create a taskInvocatorDataList - a \emph{synchronized} hash map keyed on 
the TaskInvocator.Thread.currentThread. These data are used by the 
\texttt{InteruptHangingTaskInvocators} timer.
\item create a firstRejectedTaskAnyInvocator queue - a linked list that will 
contain null or only one entry, the first task encountered that an 'Any' 
\texttt{TaskInvocator} cannot process due to the tasks key.
\item create a firstRejectedTaskGoodOnlyInvocator queue - a linked list that 
will contain null or only one entry, the first task encountered that a 'Good 
Only' \texttt{TaskInvocator} cannot process due to the tasks key.
\item create \texttt{InteruptHangingTaskInvocators} timer - a timer thread that 
will check for hung TaskInvocators.
\item initialize lastTaskTime, successfulTasks and failedTasks.
\end{code}

\paragraph{getInstance()}
- \emph{synchronized} - public static method that returns the singleton 
instance - creating it if necessary.

\paragraph{add(task)}
This is a public method that adds a task to the queue. Requires 
\emph{synchronization} on the task queue.

\paragraph{getGoodKeys()}
This is a public method that returns a list of keys designated as good keys by 
this \texttt{TaskManager}.

\paragraph{getNotGoodKeys()}
This is a public method that returns a list of keys not designated as good keys 
for queued and running tasks. Requires \emph{synchronization} on the task queue.

\paragraph{getQueuedTasks()}
This is a public method that returns a list of tasks that are currently queued 
awiating execution. Requires \emph{synchronization} on the task queue.

\paragraph{getTasks(ownerId)}
This is a public method that returns a list of queued tasks, and their status, 
that have been added to the \texttt{TaskManager} by the specified owner id. 
Requires \emph{synchronization} on the task queue.

\paragraph{getTasksPerKey()}
This is a public method that returns a map of number of queued and running 
tasks, associated with each key, keyed on the task.key. Requires 
\emph{synchronization} on the task queue.

\paragraph{statusInfo()}
This is a public method that returns a map of the status information where each 
property serves as a key pointing to its value.


\subsubsection{InteruptHangingTaskInvocators}
This is a timer thread created by the TaskManager that will check for hung 
TaskInvocators. It will try and clean up a hung \texttt{TaskInvocator} and 
before recreate new one.

\paragraph{Constructor(threadInterruptDelayMillis)} A time period in 
milliseconds, used to give the \texttt{TaskInvocator} a chance to clean up a 
task, after the task has timed out, before clobbering the 
\texttt{TaskInvocator}.

\subsection{TaskInvocator}
Pops a task from the queue and processes it.

\paragraph{Constructor}
The constructor takes seven arguments: currentTasks map, 
firstRejectedTaskAnyInvocator, firstRejectedTaskGoodOnlyInvocator, goodKey set, 
goodOnlyInvocator which is a boolean which if true causes the thread to only 
process tasks with a key in the goodKey set, taskInvocatorDataList and task 
queue

Access to all queues and sets needs to be synchronized. Note that it is 
important to give up locks as quickly as possible - and not to hold locks while 
a task is being invoked.

\pagebreak
\paragraph{run}

\begin{code}
  \item while not interrupted:
  \begin{code}
    \item synchronize on queue:
    \begin{code}
      \item if queue is empty: queue.wait()
      \item pop task from queue
    \end{code}
    \item if task not aborted: \# i.e. not HARD\_ERROR
      \begin{code}
      \item synchronize on currentTask map
      \begin{code}
        \item if (task.key in goodKey set) or
        \item (not goodOnly TaskInvocator and task.key not in currentTask map):
        \begin{code}
          \item add task.key to currentTask map
          \item set process flag
        \end{code}
      \end{code}
      \item if not process flag:
      \begin{code}
        \item requeue the task
      \end{code}
      \item else:
      \begin{code}
        \item increment task.attemptNumber      
        \item start timer \# send interrupt to task after maxRunTimeMillis
        \item result = task.invoke
        \item update lastTaskTime
        \item if task interrupted: result = HARD\_ERROR
        \item if task.getResultCode aborted: \# i.e. HARD\_ERROR
        \begin{code}
          \item remove task.key from currentTask map
        \end{code}
        \item else:
        \begin{code}
          \item task.resultCode = result
          \item if result = SUCCESS:
          \begin{code}
            \item if task.attemptNumber = 1: add task.key to goodKey set
            \item else: remove task.key from goodKey set
            \item remove task.key from currentTask map
            \item increment successfulTasks
          \end{code}
          \item else if result = SOFT\_ERROR:
          \begin{code}
            \item remove task.key from goodKey set
            \item remove task.key from currentTask map
            \item if not task aborted: requeue the task
          \end{code}
          \item else: \# HARD\_ERROR
          \begin{code}
            \item remove task.key from goodKey set
            \item removeSimilarTasks
            \item remove task.key from currentTask map
            \item increment failedTasks
          \end{code}
        \end{code}
      \end{code}
    \end{code}
  \end{code}
\end{code}

\subsection{Dealing with hanging tasks} Tasks are added to the task queue via 
the \texttt{TaskManager} and run via one of many \texttt{TaskInvocators}. If 
the task fails to complete in the allocated time 
(\texttt{task.maxRunTimeMillis}) a timer task, created by the 
\texttt{TaskInvocator}, will try and interupt the task. If successful in 
interupting the task the \texttt{TaskInvocator} will set the the ResultCode to 
\texttt{HARD\_ERROR}. All 
other tasks on the task queue with the same key will also have their 
ResultCodes set to \texttt{HARD\_ERROR}.

\begin{sloppypar}
The \texttt{InteruptHangingTaskInvocators} timer task of the \texttt{TaskManager}
periodically checks the \texttt{taskInvocatorDataList} for
\texttt{TaskInvocators} running tasks that have been running for
\texttt{task.maxRunTimeMillis} +
\texttt{InteruptHangingTaskInvocators.threadInterruptDelayMillis}, such
\texttt{TaskInvocators} are deemed to have hung. The
\texttt{InteruptHangingTaskInvocators} will set the the tasks ResultCode to
\texttt{HARD\_ERROR} and all other tasks on the task queue with the same key will
also have their ResultCodes set to \texttt{HARD\_ERROR}. \emph{N.B.}  A new
\texttt{TaskInvocator} will then be created to replace the hung one. If the old
\texttt{TaskInvocator} regains control from the task it will notice that it no
longer has an entry in the \texttt{taskInvocatorDataList} and die gracfully.
\end{sloppypar}

\subsection{Avoiding TaskInvocators furiously looping around the task queue}
Under certain conditions it would be possible for a \texttt{TaskInvocator} to 
loop furiously around the task queue. This could happen if none of the tasks in 
the task queue had keys in the goodKey set and there was one occurrence of each 
queued task.key already being executed by other \texttt{TaskInvocators}. It 
would also occur if the \texttt{TaskInvocator} was a goodOnlyInvocator and none 
of the tasks in the queue had keys in the goodKey set.

In order to guard against this a record is kept of the first task encountered 
which could not be run and was put back onto the queue. When any 
\texttt{TaskInvocator} gets back round to this 'first' rejected task it will go 
to sleep. At any point when a task is added to the queue, removed from the 
queue for processing, or removed from the currentTasks map, then the record of 
the first rejected task will be set to null and a notification sent to the 
sleeping \texttt{TaskInvocators}. As the \texttt{TaskInvocators} also have to 
sleep when the task queue is empty the same wake up trigger is used.

Further complication is introduced as there are two types of 
\texttt{TaskInvocators} 'Any' and 'Good Only'. Therefore, two variables are 
needed to record the 'first' rejected task for the \texttt{TaskInvocators}, 
firstRejectedTaskAnyInvocator and firstRejectedTaskGoodOnlyInvocator.
